home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
Chip 2000 October
/
CHIP Turkiye Ekim 2000.iso
/
prog
/
naps
/
04
/
setup.exe
/
Gnucleus
/
GnuControl.cpp
< prev
next >
Wrap
C/C++ Source or Header
|
2000-07-15
|
28KB
|
1,097 lines
/********************************************************************************
Gnucleus - A node application for the Gnutella network
Copyright (C) 2000 John Marshall
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
For support, questions, comments, etc...
E-Mail:
swabby@c0re.net
Address:
21 Cadogan Way
Nashua, NH, USA 03062
********************************************************************************/
// GnuControl.cpp : implementation file
//
#include "stdafx.h"
#include "GnuControl.h"
#include "Gnucleus.h"
#include "GnucleusDoc.h"
#include "ViewSearch.h"
#include "GnuTransfer.h"
#include "GnuHash.h"
#include "GnuSock.h"
#ifdef _DEBUG
#define new DEBUG_NEW
#undef THIS_FILE
static char THIS_FILE[] = __FILE__;
#endif
IMPLEMENT_DYNCREATE(CGnuControl, CAsyncSocket)
/////////////////////////////////////////////////////////////////////////////
// CGnuControl
CGnuControl::CGnuControl()
{
m_Initialize();
}
CGnuControl::CGnuControl(CGnucleusDoc *DocLink)
{
m_Initialize();
Doc = DocLink;
}
void CGnuControl::m_Initialize (void)
{
NodeList = NULL;
TransferList = NULL;
CoCreateGuid(&ClientGUID);
BytesIn = 0;
BytesOut = 0;
totalPings = 0;
totalPongs = 0;
totalPushes = 0;
totalUnknowns = 0;
totalQueries = 0;
totalQueryReplies = 0;
totalTTLs = 0;
totalTTLsBig = 0;
totalHops = 0;
totalHopsBig = 0;
RunningThreads = 0;
m_dwBytesPerSecIn = m_dwBytesPerSecOut = 0;
m_dwTotalBytesIn = m_dwTotalBytesOut = 0;
// Create listen socket on random port 2,500 - 25,000
srand( (unsigned)time( NULL ) );
localPort = rand() % 22500 + 2500;
localHost = "127.0.0.1";
m_QueriesSearchThreadHandle = AfxBeginThread(CGnuControl::staticQueriesSearchThread, this, THREAD_PRIORITY_BELOW_NORMAL);
}
CGnuControl::~CGnuControl()
{
// indicate to the thread that we are exiting
m_QueriesSearchThreadHandle->m_bAutoDelete = FALSE;
m_EventQueriesEND.SetEvent();
// wait for the thread to exit
WaitForSingleObject(m_QueriesSearchThreadHandle->m_hThread, INFINITE);
delete m_QueriesSearchThreadHandle;
CGnuSock *deadNode = NULL;
while(NodeList != NULL)
{
deadNode = NodeList;
NodeList = NodeList->next;
delete deadNode;
}
CGnuTransfer *deadTransfer = NULL;
while(TransferList != NULL)
{
deadTransfer = TransferList;
TransferList = TransferList->next;
delete deadTransfer;
}
}
// Do not edit the following lines, which are needed by ClassWizard.
#if 0
BEGIN_MESSAGE_MAP(CGnuControl, CAsyncSocket)
//{{AFX_MSG_MAP(CGnuControl)
//}}AFX_MSG_MAP
END_MESSAGE_MAP()
#endif // 0
/////////////////////////////////////////////////////////////////////////////
// CGnuControl member functions
BOOL CGnuControl::AddNode(CString Host, CString Port)
{
DWORD dPort = atol(Port);
CGnuSock *newNode = new CGnuSock(this);
//Attempt to connect to node
newNode->Create();
if (!newNode->Connect(Host, dPort))
{ // DW This keeps from adding unreachable connections
DWORD dwError = newNode->GetLastError();
if (dwError != WSAEWOULDBLOCK)
{
delete(newNode);
return FALSE;
}
}
// Add the node to the linked list
if(NodeList != NULL)
newNode->next = NodeList;
NodeList = newNode;
newNode->Host = Host;
newNode->Port = Port;
return TRUE;
}
void CGnuControl::RemoveNode(CString Host, CString Port)
{
CGnuSock *pFront = NodeList;
CGnuSock *pBack = NULL;
CGnuSock *deadNode;
// While list isn't empty
while(pFront != NULL)
{
// If its at the beginning of the list
if(NodeList != NULL && pBack == NULL)
{
if(pFront->Host == Host && pFront->Port == Port)
{
deadNode = NodeList;
NodeList = NodeList->next;
pFront = NodeList;
deadNode->Close(); // need this to give resources back to the system
deadNode->CleanUp();
delete deadNode;
deadNode = NULL;
}
}
// If its at the middle or end of the list
else if(pFront != NULL && pFront->Host == Host && pFront->Port == Port)
{
deadNode = pFront;
pFront = deadNode->next;
pBack->next = deadNode->next;
deadNode->Close();
deadNode->CleanUp();
delete deadNode;
deadNode = NULL;
}
pBack = pFront;
if(pFront != NULL)
pFront = pFront->next;
}
}
CGnuSock* CGnuControl::GetNode(CString HostPort)
{
int index = HostPort.Find(":", 0);
CString Host = HostPort.Mid(0, index);
CString Port = HostPort.Mid(index + 1, HostPort.GetLength() - index);
CGnuSock *p = NodeList;
while(p != NULL)
{
if(p->Host == Host && p->Port == Port)
return p;
p = p->next;
}
return NULL;
}
void CGnuControl::OnAccept( int nErrorCode )
{
CGnuSock *newNode = new CGnuSock(this);
Accept(*newNode);
if(NodeList != NULL)
newNode->next = NodeList;
NodeList = newNode;
UINT trash;
newNode->GetPeerName(newNode->Host, trash);
newNode->Port = "Inbound";
/*
if(Doc->m_DropForIncoming || Doc->m_MonitorType == 0)
{
if(NodeList != NULL)
newNode->next = NodeList;
NodeList = newNode;
UINT trash;
newNode->GetPeerName(newNode->Host, trash);
newNode->Port = "Inbound";
}
else if(Doc->m_MonitorType == 3 ||
(Doc->m_MonitorType == 2 && Doc->Connections.GetCount() < Doc->m_ConnectNum) ||
(Doc->m_MonitorType == 1 && Doc->Connections.GetCount() < Doc->m_ConnectNum - 1))
{
if(NodeList != NULL)
newNode->next = NodeList;
NodeList = newNode;
UINT trash;
newNode->GetPeerName(newNode->Host, trash);
newNode->Port = "Inbound";
}
else
{
newNode->Close();
newNode->CleanUp();
delete newNode;
}*/
}
void CGnuControl::Pass_Raw(CGnuSock *exception, byte *packet, int length)
{
CGnuSock *p = NodeList;
while(p != NULL)
{
if(p != exception && p->Connected)
{
p->Send(packet, length);
BytesOut += length;
}
p = p->next;
}
}
void CGnuControl::Broadcast_Ping(packet_Ping *Ping, int length, CGnuSock *exception)
{
CGnuSock *p = NodeList;
while(p != NULL)
{
if(p != exception && p->Connected)
{
p->Send((byte *) Ping, length);
BytesOut += length;
}
p = p->next;
}
}
void CGnuControl::Broadcast_Query(packet_Query *Query, int length, CGnuSock *exception)
{
// NOTE: Could use some flood protection
CGnuSock *p = NodeList;
while(p != NULL)
{
if(p != exception && p->Connected)
{
p->Send((byte *) Query, length);
BytesOut += length;
}
p = p->next;
}
}
void CGnuControl::Route_Pong(packet_Pong *Pong, int length, key_data *key)
{
CGnuSock *p = NodeList;
while(p != NULL)
{
if(p == key->Origin && p->Connected)
{
p->Send((byte *) Pong, length);
BytesOut += length;
}
p = p->next;
}
}
void CGnuControl::Route_QueryReply(packet_QueryReply *QueryReply, int length, key_data *key)
{
CGnuSock *p = NodeList;
while(p != NULL)
{
if(p == key->Origin && p->Connected)
{
p->Send((byte *) QueryReply, length);
BytesOut += length;
}
p = p->next;
}
}
void CGnuControl::Route_Push(packet_Push *Push, int length, key_data *key)
{
CGnuSock *p = NodeList;
while(p != NULL)
{
if(p == key->Origin && p->Connected)
{
p->Send((byte *) Push, length);
BytesOut += length;
}
p = p->next;
}
}
void CGnuControl::StartListening()
{
CString Host, Port, Title;
int decimal, sign;
Close();
if(Create(localPort))
{
Listen();
Port = _fcvt(localPort, 0, &decimal, &sign);
Title = "Gnucleus on port " + Port;
AfxGetApp()->m_pMainWnd->ModifyStyle(FWS_ADDTOTITLE, 0);
AfxGetApp()->m_pMainWnd->SetWindowText(Title);
}
}
// File Transfer Part of the code
void CGnuControl::AddTransfer(QueryItem File, SOCKET sock)
{
CGnuTransfer *newTransfer = new CGnuTransfer(this);
if (!sock)
{
//Attempt to connect to node
newTransfer->Create();
}
else
{
newTransfer->Attach (sock);
}
// Add the transfer to the linked list
if(TransferList != NULL)
newTransfer->next = TransferList;
TransferList = newTransfer;
newTransfer->Handle = File.Handle;
newTransfer->Status = File.Status;
newTransfer->FileInfo = File;
newTransfer->Type = File.TransferType;
newTransfer->BytesCompleted = File.BytesCompleted;
newTransfer->FileSize = File.Size;
if (sock) // It's already connected and waiting for the GET request!
{
if(newTransfer->Type == 'D')
newTransfer->OnConnect (0);
if(newTransfer->Type == 'U')
{
/*
newTransfer->FileSize = File.Size;
newTransfer->BytesCompleted = File.BytesCompleted;
*/
}
}
}
void CGnuControl::RemoveTransfer(int row, char type)
{
CGnuTransfer *pFront = TransferList;
CGnuTransfer *pBack = NULL;
CGnuTransfer *deadTransfer;
// While list isn't empty
while(pFront != NULL)
{
// If its at the beginning of the list
if(TransferList != NULL && pBack == NULL)
{
if(pFront->Handle == row && pFront->Type == type)
{
deadTransfer = TransferList;
TransferList = TransferList->next;
pFront = TransferList;
deadTransfer->Close();
delete deadTransfer;
deadTransfer = NULL;
}
}
// If its at the middle or end of the list
else if(pFront != NULL && pFront->Handle == row && pFront->Type == type)
{
deadTransfer = pFront;
pFront = deadTransfer->next;
pBack->next = deadTransfer->next;
deadTransfer->Close();
delete deadTransfer;
deadTransfer = NULL;
}
pBack = pFront;
if(pFront != NULL)
pFront = pFront->next;
}
}
std::map <GUID, QueryItem, CompareGuids> g_pushList;
void CGnuControl::NewPushRequest(QueryItem File)
{
// ClientID's are in their own table
key_data *ClientID = HashClientTable.FindValue(&File.Guid);
// ClientID not found in table
if(ClientID == NULL)
return;
// Build the packet
packet_Push push;
::memset (&push, 0, sizeof (push));
::CoCreateGuid (&push.Header.Guid);
push.Header.Function = 0x40;
push.Header.TTL = 7;
push.Header.Hops = 0;
push.Header.Payload = flipX(makeX(26));
push.ClientID = ClientID->Guid;
push.Index = flipX(makeX(File.Index));
if( IPtoStr(Doc->m_ForceIP) != "0.0.0.0")
push.Host = Doc->m_ForceIP;
else
push.Host = StrtoIP(localHost);
push.Port = localPort;
try
{
ClientID->Origin->Send((byte *) &push, 23 + 26);
}
catch ( ... ) // catch the bad Origin pointer for now
{
return;
}
// Save the guid, the connecting host will send us it
GUID temp_guid;
temp_guid = push.ClientID;
temp_guid.Data1 ^= File.Index; // We can't use the straight guid
temp_guid.Data2 ^= (File.Index << 1); // just xor some of the values in the guid
g_pushList[temp_guid] = File;
}
CGnuTransfer* CGnuControl::GetTransfer(int row, char type)
{
CGnuTransfer *pos = TransferList;
while(pos != NULL)
{
if(pos->Handle == row && pos->Type == type)
return pos;
pos = pos->next;
}
return NULL;
}
DWORD CGnuControl::GetHostCount()
{
DWORD Total = 0;
CGnuSock *p = NodeList;
while(p != NULL)
{
if(p->Connected)
Total += p->TotalRemoteHosts() + 1;
p = p->next;
}
return Total;
}
////////////////////////////////////////////////////////////
//! author="Nathan Brown"
//
//: Pushes query request to queue for thread.
// Use this for variable from CGnuSock object
void CGnuControl::PushQueryToQueue(CGnuSock * _lp_sock)
{
// protect query list from access when adding
bool bSucess = true;
SearchQueryItem search_item;
if(! AfxIsValidAddress( _lp_sock, sizeof(CGnuSock)))
{
// ASSERT(FALSE);
return;
}
try // don't trust pointers
{
search_item.ConnectedSock = _lp_sock;
search_item.SearchString = _lp_sock->CurrentSearch;
search_item.ClientGuid = _lp_sock->GnuComm->ClientGUID;
search_item.Origin = _lp_sock->CurrentOrigin;
search_item.Host = StrtoIP(_lp_sock->GnuComm->localHost);
search_item.localPort = _lp_sock->GnuComm->localPort;
search_item.SearchString.MakeLower();
// Lock, push, and unlock
m_QueryListCriticalSection.Lock();
m_SearchQueries.push_back(search_item);
m_QueryListCriticalSection.Unlock();
}
catch ( ... ) // bad pointer
{
// failed
bSucess = false;
}
// let the thread know that there is a item
if(bSucess)
m_EventQueriesWaiting.SetEvent();
}
////////////////////////////////////////////////////////////
//! author="Nathan Brown"
//
//: Pops query off of queue for thread to start search
bool CGnuControl::PopQueryFromQueue(SearchQueryItem & _search_item)
{
bool bResult = true;
if( m_SearchQueries.empty())
return false;
else
{
// protect query list from access when removing
m_QueryListCriticalSection.Lock();
SearchQueryItem * search_item = &m_SearchQueries.front();
// copy the items to the waiting struct
_search_item.ConnectedSock = search_item->ConnectedSock;
_search_item.SearchString = search_item->SearchString;
_search_item.ClientGuid = search_item->ClientGuid;
_search_item.Origin = search_item->Origin;
_search_item.Host = search_item->Host;
_search_item.localPort = search_item->localPort;
// remove from stack
m_SearchQueries.pop_front();
// release the lock.
m_QueryListCriticalSection.Unlock();
return bResult;
}
}
struct query_search_word
{
query_search_word(){};
query_search_word(DWORD _query_num, CString & _searchWord)
: query_num(_query_num), searchWord(_searchWord) {};
DWORD query_num;
CString searchWord;
};
struct query_result_match
{
query_result_match(DWORD _query_num, QueryResult _query_result)
: query_num(_query_num), query_result(_query_result) {};
DWORD query_num;
QueryResult query_result;
};
////////////////////////////////////////////////////////////
//! author="Nathan Brown"
//
//: Thread for all connections to search for matches to queries
UINT CGnuControl::staticQueriesSearchThread(LPVOID _lpv_control)
{
// Call class member so it's localized.
return ((CGnuControl *)_lpv_control)->QueriesSearchThread();
}
// localized class thread
UINT CGnuControl::QueriesSearchThread()
{
SearchQueryItem query_item_temp;
query_item_temp.bIsMatch = 0;
// rejected queries for a file
bool b_table_not_current_file[50];
DWORD dw_table_num_of_matches[50];
DWORD i_num_queries_at_once = 0;
// for before splitting up among queries
std::vector <query_result_match> query_match;
// result of one query
std::queue <QueryResult> resultQueue;
query_search_word query_word_temp;
std::vector<SharedFile>::iterator itName;
std::vector<DWORD>::iterator itSize;
std::vector<SearchQueryItem>::iterator it_queries_vector;
std::vector<query_search_word>::iterator it_query_search;
// Split up the search string once so we don't do it for every file
std::vector<query_search_word> searchWords;
// vector for matching results to who requested it.
std::vector<SearchQueryItem> queries_vector;
DWORD hits = 0;
// for multilock
CSyncObject * query_events[2] = { &m_EventQueriesEND ,
&m_EventQueriesWaiting }; // order does matter
CMultiLock query_events_multi_lock(query_events, 2, FALSE);
// *start loop
for(;;)
{
// *check semaphore for any jobs waiting, wait until there is one
// *also make sure we arn't supposed to be shutting down.
if(query_events_multi_lock.Lock(INFINITE, FALSE, 0) == WAIT_OBJECT_0)
// Queries end was called, exit.
return 0; // simple as that
// else go on to handle a search event
m_EventQueriesWaiting.ResetEvent();
// *check query queue, pull items off.
while( PopQueryFromQueue(query_item_temp) && i_num_queries_at_once < 50)
{
query_word_temp.query_num = i_num_queries_at_once;
queries_vector.push_back(query_item_temp);
++i_num_queries_at_once;
// we arn't using these delemiters, so convert to one we are.
query_item_temp.SearchString.Replace("*", " ");
query_item_temp.SearchString.Replace("+", " ");
// *split strings into seperate items (delemited by space)
// *parse string and put into search vector.
while ( !query_item_temp.SearchString.IsEmpty() )
{
int idq = query_item_temp.SearchString.Find ("\"");
int idx = query_item_temp.SearchString.Find (" ");
if (idx != -1)
{
if (idx > 0)
{
query_word_temp.searchWord = query_item_temp.SearchString.Left (idx);
searchWords.push_back (query_word_temp);
}
query_item_temp.SearchString = query_item_temp.SearchString.Mid (idx+1);
}
else
{
query_word_temp.searchWord = query_item_temp.SearchString;
searchWords.push_back (query_word_temp);
break;
}
}
}
// *check for file table lock, wait until available
Doc->LockUploadData (false);
if(! Doc->SharedFiles.empty())
{
// *search file table
for(int ia = 0; ia < i_num_queries_at_once; ++ia)
dw_table_num_of_matches[ia] = 0;
// Determine if we have any files that match the search
DWORD loop;
for(loop = 0, itName = Doc->SharedFiles.begin(), itSize = Doc->SharedSizes.begin();
itName != Doc->SharedFiles.end(), itSize != Doc->SharedSizes.end();
loop++, itName++, itSize++)
{
CString CompareFile( (*itName).FileDir );
//CString FileToSend = CompareFile.Mid( CompareFile.ReverseFind('\\') + 1);
// CString FileToSend = (*itName).FileName;
for(int ib = 0; ib < i_num_queries_at_once; ++ib)
{
b_table_not_current_file[ib] = false;
if( dw_table_num_of_matches[ib] >
(Doc->m_MaxReplies ? Doc->m_MaxReplies : 563)) // limit replies anyways.
b_table_not_current_file[ib] = true; // quick hack for now. probably remove the file from the list later.
}
DWORD length = 50;
// break directory off of compare string
int slashes = 0;
for(int i = CompareFile.GetLength() - 1; i > 0 && slashes < 2; i--)
if( CompareFile.GetAt(i) == '\\' )
slashes++;
CompareFile = CompareFile.Mid(i + 2, CompareFile.GetLength() - i);
int Found=0;
// look for matches to the words
for(it_query_search = searchWords.begin (); it_query_search != searchWords.end (); ++it_query_search)
{
if( b_table_not_current_file[(*it_query_search).query_num] == false
&& CompareFile.Find ((*it_query_search).searchWord) != -1)
{
Found++;
}
else
{
// this query is rejected
b_table_not_current_file[(*it_query_search).query_num] = true;
}
}
if(Found)
{
// add to result queue if there is a match for a query
for(DWORD i = 0; i < i_num_queries_at_once; ++i)
{
if ( b_table_not_current_file[i] == false )
{
// add to result queue
query_match.push_back( query_result_match(i, QueryResult (loop, *itSize, (*itName).FileName ) ) );
// indicate match
queries_vector[i].bIsMatch = true;
// increment the counters for number of results
dw_table_num_of_matches[i]++;
}
}
}
}
}
// *build replies per query
std::vector<query_result_match>::iterator it;
int i_count = 0;
if(! queries_vector.empty())
{
for( it_queries_vector = queries_vector.begin(); it_queries_vector != queries_vector.end(); it_queries_vector++)
{
// is signaled as having results?
if( (*it_queries_vector).bIsMatch )
{
for(it = query_match.begin(); it != query_match.end(); it++)
{
if( (*it).query_num == i_count )
{
// add result to queue to send
QueryResult query_item_temp_ = ((*it).query_result);
resultQueue.push( query_item_temp_ );
}
}
// Send results - We don't need to try and catch this because the SendFileResults function does that for us.
(*it_queries_vector).ConnectedSock->SendFileResults(resultQueue, (*it_queries_vector).Origin );
// empty the queue incase it wasn't already done
while(! resultQueue.empty())
resultQueue.pop();
}
i_count++;
}
}
// unlock table database, we are done with it.
Doc->UnlockUploadData (false);
// *clear various items.
i_num_queries_at_once = 0;
query_match.clear();
searchWords.clear();
queries_vector.clear();
hits = 0;
// *repeate and wait for next item.
}
return 0;
}
////////////////////////////////////////////////////////////
//! author="Nathan Brown"
//
//: Updates various aspects of each sock and manages bandwidth limiting.
void CGnuControl::OnTimer()
{
// * add up all the in and out going traffic, compair to the last saved amount
// * and calculate a bit rate.
DWORD dw_bytes_in = 0, dw_bytes_out = 0;
int num_transfers = 0, num_nodes = 0;
float balance_transfers_at_percent = (.5) ;
CGnuSock *p_soc = NodeList;
CGnuTransfer *p_tns = TransferList;
for(p_soc = NodeList; p_soc != NULL;)
{
num_nodes++;
dw_bytes_in += p_soc->m_dwBytesIn;
p_soc->m_dwTotalBytesIn += p_soc->m_dwBytesIn;
dw_bytes_out += p_soc->m_dwBytesOut;
p_soc->m_dwTotalBytesOut += p_soc->m_dwBytesOut;
p_soc = p_soc->next;
}
for(p_tns = TransferList; p_tns != NULL;)
{
num_transfers++;
dw_bytes_in += p_tns->m_dwBytesIn;
p_tns->m_dwTotalBytesIn += p_tns->m_dwBytesIn;
dw_bytes_out += p_tns->m_dwBytesOut;
p_tns->m_dwTotalBytesOut += p_tns->m_dwBytesOut;
p_tns = p_tns->next;
}
// calculate bit rate, weighted 1:2 to the previous sample
m_dwBytesPerSecIn = (dw_bytes_in + m_dwBytesPerSecIn*2) / 3;
m_dwBytesPerSecOut = (dw_bytes_out + m_dwBytesPerSecOut*2) / 3;
if(num_nodes)
{
int extra_from_last_cycle = 0;
DWORD dw_bytes_out_per_soc = 0;
DWORD dw_bytes_out_per_tns = 0;
DWORD dw_bytes_out_per_thread_plus = 0;
DWORD dw_bytes_in_per_soc = 0;
DWORD dw_bytes_in_per_tns = 0;
DWORD dw_bytes_in_per_thread_plus = 0;
DWORD dw_bytes_out_total = 0;
DWORD dw_bytes_in_total = 0;
// * slice up an in/out allowance among the sockets.
if(Doc->m_LimitTotal)
{
dw_bytes_out_total = Doc->m_LimitTotal * 1024;
dw_bytes_in_total = dw_bytes_out_total - dw_bytes_in;
extra_from_last_cycle = Doc->m_LimitTotal * 1024 / 2 - dw_bytes_out; // get any unused portion from the last time slice
extra_from_last_cycle = extra_from_last_cycle > 0 ? extra_from_last_cycle : 0; // keep it from being negative
dw_bytes_out_per_soc = (Doc->m_LimitTotal * 1024 / (num_nodes * 2)) + extra_from_last_cycle; // limit each sock to just a fraction
dw_bytes_out_per_soc -= num_transfers ? dw_bytes_out_per_soc * balance_transfers_at_percent : 0; // keep a good chunk for transfers
dw_bytes_in_per_soc = dw_bytes_out_per_soc;
dw_bytes_out_per_tns = balance_transfers_at_percent * Doc->m_LimitTotal * 1024 / (num_nodes * 2); // limit each sock to just a fraction
dw_bytes_in_per_tns = dw_bytes_out_per_soc;
}
if(Doc->m_LimitUp)
{
dw_bytes_out_total = Doc->m_LimitUp * 1024 - (Doc->m_LimitTotal ? dw_bytes_in / 2 : 0);
extra_from_last_cycle = Doc->m_LimitUp * 1024 - dw_bytes_out; // get any unused portion from the last time slice
extra_from_last_cycle = extra_from_last_cycle > 0 ? extra_from_last_cycle : 0; // keep it from being negative
dw_bytes_out_per_soc = (Doc->m_LimitUp * 1024 / num_nodes) + extra_from_last_cycle; // limit each sock to just a fraction
dw_bytes_out_per_soc -= num_transfers ? dw_bytes_out_per_soc * balance_transfers_at_percent : 0; // keep a good chunk for transfers
dw_bytes_out_per_tns = balance_transfers_at_percent * Doc->m_LimitUp * 1024 / num_nodes; // limit each sock to just a fraction
}
dw_bytes_out_per_thread_plus = (DWORD)((double)(dw_bytes_out_per_soc) * 0.20); // add a buffer in case a sock need to catch up.
dw_bytes_in_per_thread_plus = (DWORD)((double)(dw_bytes_in_per_soc) * 0.20); // add a buffer in case a sock need to catch up.
for(p_soc = NodeList; p_soc != NULL;)
{
// if this thread is hitting it's allotment, pull it back.
if( p_soc->m_dwBytesOut >= p_soc->m_dwByteAllottmentOut)
p_soc->m_dwByteAllottmentOut = dw_bytes_out_per_soc;
else
p_soc->m_dwByteAllottmentOut = dw_bytes_out_per_thread_plus;
// same for incomming traffic
if( p_soc->m_dwBytesIn >= p_soc->m_dwByteAllottmentIn)
p_soc->m_dwByteAllottmentIn = dw_bytes_in_per_soc;
else
p_soc->m_dwByteAllottmentIn = dw_bytes_in_per_thread_plus;
// weed any getting way behind in their queue
if(p_soc->m_iSizeOfPacketQueue > 100)
{
CString Host = p_soc->Host;
CString Port = p_soc->Port;
p_soc = p_soc->next; // avoid trying to access the pointer to get next after it's gone
RemoveNode(Host, Port);
if(Doc->m_ConnectNum > 2) // see if we can lower the target amount of clients
Doc->m_ConnectNum--;
}
else
p_soc = p_soc->next;
}
dw_bytes_out_per_thread_plus = (DWORD)((double)(dw_bytes_out_per_tns) * 0.20); // add a buffer in case a sock need to catch up.
dw_bytes_in_per_thread_plus = (DWORD)((double)(dw_bytes_in_per_tns) * 0.20); // add a buffer in case a sock need to catch up.
for(p_tns = TransferList; p_tns != NULL;)
{
// if this thread is hitting it's allotment, give it a little more slack
if( dw_bytes_out_total >= p_tns->m_dwByteAllottmentOut
&& dw_bytes_out_total > dw_bytes_out)
p_tns->m_dwByteAllottmentOut += (dw_bytes_out_total - dw_bytes_out) * .20; // increase by 20% of the difference
else if(dw_bytes_out_total < dw_bytes_out) // it's way over, drop it down
p_tns->m_dwByteAllottmentOut = dw_bytes_out_per_thread_plus;
else // lower bar a little.
{
p_tns->m_dwByteAllottmentOut -= (dw_bytes_out_total - dw_bytes_out) * .20;
if(p_tns->m_dwByteAllottmentOut < dw_bytes_out_per_tns) // oops, too much
p_tns->m_dwByteAllottmentOut = dw_bytes_out_per_thread_plus;
}
// same for incomming traffic
if( p_tns->m_dwBytesIn >= p_tns->m_dwByteAllottmentIn)
p_tns->m_dwByteAllottmentIn = dw_bytes_in_per_soc;
else
p_tns->m_dwByteAllottmentIn = dw_bytes_in_per_thread_plus;
p_tns = p_tns->next;
}
// * redistribute any allowance left over from slow socks.
// * check for any socks over allotting their share, and how many total
// * connections there, are, and deside whether to drop on or not.
// * alternativly, filter packets to lower bandwidth consumtion.
}
for(p_soc = NodeList; p_soc != NULL;)
{
p_soc->m_dwBytesIn = 0;
p_soc->m_dwBytesOut = 0;
// give a chance to proccess more of the packets in the qeue
p_soc->ProccessPacketQueue();
p_soc = p_soc->next;
}
for(p_tns = TransferList; p_tns != NULL;)
{
p_tns->m_dwBytesIn = 0;
p_tns->m_dwBytesOut = 0;
p_tns = p_tns->next;
}
}